// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importutil

import (
	"context"
	"encoding/json"
	"fmt"
	"path"
	"runtime/debug"
	"strconv"
	"strings"

	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"

	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
	"github.com/milvus-io/milvus/internal/storage"
	"github.com/milvus-io/milvus/pkg/common"
	"github.com/milvus-io/milvus/pkg/log"
	"github.com/milvus-io/milvus/pkg/util/merr"
	"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type (
	BlockData map[storage.FieldID]storage.FieldData // a map of field ID to field data
	ShardData map[int64]BlockData                   // a map of partition ID to block data
)

func isCanceled(ctx context.Context) bool {
	// canceled?
	select {
	case <-ctx.Done():
		return true
	default:
		break
	}
	return false
}

func initBlockData(collectionSchema *schemapb.CollectionSchema) BlockData {
	blockData := make(BlockData)
	// rowID field is a hidden field with fieldID=0, it is always auto-generated by IDAllocator
	// if primary key is int64 and autoID=true, primary key field is equal to rowID field
	blockData[common.RowIDField] = &storage.Int64FieldData{
		Data: make([]int64, 0),
	}

	for i := 0; i < len(collectionSchema.Fields); i++ {
		schema := collectionSchema.Fields[i]
		switch schema.DataType {
		case schemapb.DataType_Bool:
			blockData[schema.GetFieldID()] = &storage.BoolFieldData{
				Data: make([]bool, 0),
			}
		case schemapb.DataType_Float:
			blockData[schema.GetFieldID()] = &storage.FloatFieldData{
				Data: make([]float32, 0),
			}
		case schemapb.DataType_Double:
			blockData[schema.GetFieldID()] = &storage.DoubleFieldData{
				Data: make([]float64, 0),
			}
		case schemapb.DataType_Int8:
			blockData[schema.GetFieldID()] = &storage.Int8FieldData{
				Data: make([]int8, 0),
			}
		case schemapb.DataType_Int16:
			blockData[schema.GetFieldID()] = &storage.Int16FieldData{
				Data: make([]int16, 0),
			}
		case schemapb.DataType_Int32:
			blockData[schema.GetFieldID()] = &storage.Int32FieldData{
				Data: make([]int32, 0),
			}
		case schemapb.DataType_Int64:
			blockData[schema.GetFieldID()] = &storage.Int64FieldData{
				Data: make([]int64, 0),
			}
		case schemapb.DataType_BinaryVector:
			dim, _ := getFieldDimension(schema)
			blockData[schema.GetFieldID()] = &storage.BinaryVectorFieldData{
				Data: make([]byte, 0),
				Dim:  dim,
			}
		case schemapb.DataType_FloatVector:
			dim, _ := getFieldDimension(schema)
			blockData[schema.GetFieldID()] = &storage.FloatVectorFieldData{
				Data: make([]float32, 0),
				Dim:  dim,
			}
		case schemapb.DataType_String, schemapb.DataType_VarChar:
			blockData[schema.GetFieldID()] = &storage.StringFieldData{
				Data: make([]string, 0),
			}
		case schemapb.DataType_JSON:
			blockData[schema.GetFieldID()] = &storage.JSONFieldData{
				Data: make([][]byte, 0),
			}
		case schemapb.DataType_Array:
			blockData[schema.GetFieldID()] = &storage.ArrayFieldData{
				Data:        make([]*schemapb.ScalarField, 0),
				ElementType: schema.GetElementType(),
			}
		default:
			log.Warn("Import util: unsupported data type", zap.String("DataType", getTypeName(schema.DataType)))
			return nil
		}
	}

	return blockData
}

func initShardData(collectionSchema *schemapb.CollectionSchema, partitionIDs []int64) ShardData {
	shardData := make(ShardData)
	for i := 0; i < len(partitionIDs); i++ {
		blockData := initBlockData(collectionSchema)
		if blockData == nil {
			return nil
		}
		shardData[partitionIDs[i]] = blockData
	}

	return shardData
}

func parseFloat(s string, bitsize int, fieldName string) (float64, error) {
	value, err := strconv.ParseFloat(s, bitsize)
	if err != nil {
		return 0, merr.WrapErrImportFailed(fmt.Sprintf("failed to parse value '%s' for field '%s', error: %v", s, fieldName, err))
	}

	err = typeutil.VerifyFloat(value)
	if err != nil {
		return 0, merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%s' for field '%s', error: %v", s, fieldName, err))
	}

	return value, nil
}

// Validator is field value validator
type Validator struct {
	convertFunc func(obj interface{}, field storage.FieldData) error // convert data function
	primaryKey  bool                                                 // true for primary key
	autoID      bool                                                 // only for primary key field
	isString    bool                                                 // for string field
	dimension   int                                                  // only for vector field
	fieldName   string                                               // field name
	fieldID     int64                                                // field ID
}

// initValidators constructs valiator methods and data conversion methods
func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[storage.FieldID]*Validator) error {
	if collectionSchema == nil {
		return merr.WrapErrImportFailed("collection schema is nil")
	}

	for i := 0; i < len(collectionSchema.Fields); i++ {
		schema := collectionSchema.Fields[i]

		validators[schema.GetFieldID()] = &Validator{}
		validators[schema.GetFieldID()].primaryKey = schema.GetIsPrimaryKey()
		validators[schema.GetFieldID()].autoID = schema.GetAutoID()
		validators[schema.GetFieldID()].fieldName = schema.GetName()
		validators[schema.GetFieldID()].fieldID = schema.GetFieldID()
		validators[schema.GetFieldID()].isString = false

		switch schema.DataType {
		case schemapb.DataType_Bool:
			validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
				if value, ok := obj.(bool); ok {
					field.(*storage.BoolFieldData).Data = append(field.(*storage.BoolFieldData).Data, value)
				} else {
					return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for bool type field '%s'", obj, schema.GetName()))
				}

				return nil
			}
		case schemapb.DataType_Float:
			validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
				if num, ok := obj.(json.Number); ok {
					value, err := parseFloat(string(num), 32, schema.GetName())
					if err != nil {
						return err
					}
					field.(*storage.FloatFieldData).Data = append(field.(*storage.FloatFieldData).Data, float32(value))
				} else {
					return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for float type field '%s'", obj, schema.GetName()))
				}

				return nil
			}
		case schemapb.DataType_Double:
			validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
				if num, ok := obj.(json.Number); ok {
					value, err := parseFloat(string(num), 64, schema.GetName())
					if err != nil {
						return err
					}
					field.(*storage.DoubleFieldData).Data = append(field.(*storage.DoubleFieldData).Data, value)
				} else {
					return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for double type field '%s'", obj, schema.GetName()))
				}
				return nil
			}
		case schemapb.DataType_Int8:
			validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
				if num, ok := obj.(json.Number); ok {
					value, err := strconv.ParseInt(string(num), 0, 8)
					if err != nil {
						return merr.WrapErrImportFailed(fmt.Sprintf("failed to parse value '%v' for int8 field '%s', error: %v", num, schema.GetName(), err))
					}
					field.(*storage.Int8FieldData).Data = append(field.(*storage.Int8FieldData).Data, int8(value))
				} else {
					return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for int8 type field '%s'", obj, schema.GetName()))
				}
				return nil
			}
		case schemapb.DataType_Int16:
			validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
				if num, ok := obj.(json.Number); ok {
					value, err := strconv.ParseInt(string(num), 0, 16)
					if err != nil {
						return merr.WrapErrImportFailed(fmt.Sprintf("failed to parse value '%v' for int16 field '%s', error: %v", num, schema.GetName(), err))
					}
					field.(*storage.Int16FieldData).Data = append(field.(*storage.Int16FieldData).Data, int16(value))
				} else {
					return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for int16 type field '%s'", obj, schema.GetName()))
				}
				return nil
			}
		case schemapb.DataType_Int32:
			validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
				if num, ok := obj.(json.Number); ok {
					value, err := strconv.ParseInt(string(num), 0, 32)
					if err != nil {
						return merr.WrapErrImportFailed(fmt.Sprintf("failed to parse value '%v' for int32 field '%s', error: %v", num, schema.GetName(), err))
					}
					field.(*storage.Int32FieldData).Data = append(field.(*storage.Int32FieldData).Data, int32(value))
				} else {
					return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for int32 type field '%s'", obj, schema.GetName()))
				}
				return nil
			}
		case schemapb.DataType_Int64:
			validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
				if num, ok := obj.(json.Number); ok {
					value, err := strconv.ParseInt(string(num), 0, 64)
					if err != nil {
						return merr.WrapErrImportFailed(fmt.Sprintf("failed to parse value '%v' for int64 field '%s', error: %v", num, schema.GetName(), err))
					}
					field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, value)
				} else {
					return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for int64 type field '%s'", obj, schema.GetName()))
				}
				return nil
			}
		case schemapb.DataType_BinaryVector:
			dim, err := getFieldDimension(schema)
			if err != nil {
				return err
			}
			validators[schema.GetFieldID()].dimension = dim

			validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
				arr, ok := obj.([]interface{})
				if !ok {
					return merr.WrapErrImportFailed(fmt.Sprintf("'%v' is not an array for binary vector field '%s'", obj, schema.GetName()))
				}
				// we use uint8 to represent binary vector in json file, each uint8 value represents 8 dimensions.
				if len(arr)*8 != dim {
					return merr.WrapErrImportFailed(fmt.Sprintf("bit size %d doesn't equal to vector dimension %d of field '%s'", len(arr)*8, dim, schema.GetName()))
				}

				for i := 0; i < len(arr); i++ {
					if num, ok := arr[i].(json.Number); ok {
						value, err := strconv.ParseUint(string(num), 0, 8)
						if err != nil {
							return merr.WrapErrImportFailed(fmt.Sprintf("failed to parse value '%v' for binary vector field '%s', error: %v", num, schema.GetName(), err))
						}
						field.(*storage.BinaryVectorFieldData).Data = append(field.(*storage.BinaryVectorFieldData).Data, byte(value))
					} else {
						return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for binary vector field '%s'", obj, schema.GetName()))
					}
				}

				return nil
			}
		case schemapb.DataType_FloatVector:
			dim, err := getFieldDimension(schema)
			if err != nil {
				return err
			}
			validators[schema.GetFieldID()].dimension = dim

			validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
				arr, ok := obj.([]interface{})
				if !ok {
					return merr.WrapErrImportFailed(fmt.Sprintf("'%v' is not an array for float vector field '%s'", obj, schema.GetName()))
				}
				if len(arr) != dim {
					return merr.WrapErrImportFailed(fmt.Sprintf("array size %d doesn't equal to vector dimension %d of field '%s'", len(arr), dim, schema.GetName()))
				}

				for i := 0; i < len(arr); i++ {
					if num, ok := arr[i].(json.Number); ok {
						value, err := parseFloat(string(num), 32, schema.GetName())
						if err != nil {
							return err
						}
						field.(*storage.FloatVectorFieldData).Data = append(field.(*storage.FloatVectorFieldData).Data, float32(value))
					} else {
						return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for float vector field '%s'", obj, schema.GetName()))
					}
				}

				return nil
			}
		case schemapb.DataType_String, schemapb.DataType_VarChar:
			validators[schema.GetFieldID()].isString = true

			validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
				if value, ok := obj.(string); ok {
					field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, value)
				} else {
					return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for varchar type field '%s'", obj, schema.GetName()))
				}
				return nil
			}
		case schemapb.DataType_JSON:
			validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
				// for JSON data, we accept two kinds input: string and map[string]interface
				// user can write JSON content as {"FieldJSON": "{\"x\": 8}"} or {"FieldJSON": {"x": 8}}
				if value, ok := obj.(string); ok {
					var dummy interface{}
					err := json.Unmarshal([]byte(value), &dummy)
					if err != nil {
						return merr.WrapErrImportFailed(fmt.Sprintf("failed to parse value '%v' for JSON field '%s', error: %v", value, schema.GetName(), err))
					}
					field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, []byte(value))
				} else if mp, ok := obj.(map[string]interface{}); ok {
					bs, err := json.Marshal(mp)
					if err != nil {
						return merr.WrapErrImportFailed(fmt.Sprintf("failed to parse value for JSON field '%s', error: %v", schema.GetName(), err))
					}
					field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, bs)
				} else {
					return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for JSON type field '%s'", obj, schema.GetName()))
				}
				return nil
			}
		case schemapb.DataType_Array:
			validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
				arr, ok := obj.([]interface{})
				if !ok {
					return merr.WrapErrImportFailed(fmt.Sprintf("'%v' is not an array for array field '%s'", obj, schema.GetName()))
				}
				return getArrayElementData(schema, arr, field)
			}
		default:
			return merr.WrapErrImportFailed(fmt.Sprintf("unsupport data type: %s", getTypeName(collectionSchema.Fields[i].DataType)))
		}
	}

	return nil
}

func getArrayElementData(schema *schemapb.FieldSchema, arr []interface{}, field storage.FieldData) error {
	switch schema.GetElementType() {
	case schemapb.DataType_Bool:
		boolData := make([]bool, 0)
		for i := 0; i < len(arr); i++ {
			if value, ok := arr[i].(bool); ok {
				boolData = append(boolData, value)
			} else {
				return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for bool array field '%s'", arr, schema.GetName()))
			}
		}
		field.(*storage.ArrayFieldData).Data = append(field.(*storage.ArrayFieldData).Data, &schemapb.ScalarField{
			Data: &schemapb.ScalarField_BoolData{
				BoolData: &schemapb.BoolArray{
					Data: boolData,
				},
			},
		})
	case schemapb.DataType_Int8:
		int8Data := make([]int32, 0)
		for i := 0; i < len(arr); i++ {
			if num, ok := arr[i].(json.Number); ok {
				value, err := strconv.ParseInt(string(num), 0, 8)
				if err != nil {
					return err
				}
				int8Data = append(int8Data, int32(value))
			} else {
				return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for int array field '%s'", arr, schema.GetName()))
			}
		}
		field.(*storage.ArrayFieldData).Data = append(field.(*storage.ArrayFieldData).Data, &schemapb.ScalarField{
			Data: &schemapb.ScalarField_IntData{
				IntData: &schemapb.IntArray{
					Data: int8Data,
				},
			},
		})

	case schemapb.DataType_Int16:
		int16Data := make([]int32, 0)
		for i := 0; i < len(arr); i++ {
			if num, ok := arr[i].(json.Number); ok {
				value, err := strconv.ParseInt(string(num), 0, 16)
				if err != nil {
					return err
				}
				int16Data = append(int16Data, int32(value))
			} else {
				return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for int array field '%s'", arr, schema.GetName()))
			}
		}
		field.(*storage.ArrayFieldData).Data = append(field.(*storage.ArrayFieldData).Data, &schemapb.ScalarField{
			Data: &schemapb.ScalarField_IntData{
				IntData: &schemapb.IntArray{
					Data: int16Data,
				},
			},
		})
	case schemapb.DataType_Int32:
		intData := make([]int32, 0)
		for i := 0; i < len(arr); i++ {
			if num, ok := arr[i].(json.Number); ok {
				value, err := strconv.ParseInt(string(num), 0, 32)
				if err != nil {
					return err
				}
				intData = append(intData, int32(value))
			} else {
				return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for int array field '%s'", arr, schema.GetName()))
			}
		}
		field.(*storage.ArrayFieldData).Data = append(field.(*storage.ArrayFieldData).Data, &schemapb.ScalarField{
			Data: &schemapb.ScalarField_IntData{
				IntData: &schemapb.IntArray{
					Data: intData,
				},
			},
		})
	case schemapb.DataType_Int64:
		longData := make([]int64, 0)
		for i := 0; i < len(arr); i++ {
			if num, ok := arr[i].(json.Number); ok {
				value, err := strconv.ParseInt(string(num), 0, 64)
				if err != nil {
					return err
				}
				longData = append(longData, value)
			} else {
				return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for long array field '%s'", arr, schema.GetName()))
			}
		}
		field.(*storage.ArrayFieldData).Data = append(field.(*storage.ArrayFieldData).Data, &schemapb.ScalarField{
			Data: &schemapb.ScalarField_LongData{
				LongData: &schemapb.LongArray{
					Data: longData,
				},
			},
		})
	case schemapb.DataType_Float:
		floatData := make([]float32, 0)
		for i := 0; i < len(arr); i++ {
			if num, ok := arr[i].(json.Number); ok {
				value, err := parseFloat(string(num), 32, schema.GetName())
				if err != nil {
					return err
				}
				floatData = append(floatData, float32(value))
			} else {
				return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for float array field '%s'", arr, schema.GetName()))
			}
		}
		field.(*storage.ArrayFieldData).Data = append(field.(*storage.ArrayFieldData).Data, &schemapb.ScalarField{
			Data: &schemapb.ScalarField_FloatData{
				FloatData: &schemapb.FloatArray{
					Data: floatData,
				},
			},
		})
	case schemapb.DataType_Double:
		doubleData := make([]float64, 0)
		for i := 0; i < len(arr); i++ {
			if num, ok := arr[i].(json.Number); ok {
				value, err := parseFloat(string(num), 32, schema.GetName())
				if err != nil {
					return err
				}
				doubleData = append(doubleData, value)
			} else {
				return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for double array field '%s'", arr, schema.GetName()))
			}
		}
		field.(*storage.ArrayFieldData).Data = append(field.(*storage.ArrayFieldData).Data, &schemapb.ScalarField{
			Data: &schemapb.ScalarField_DoubleData{
				DoubleData: &schemapb.DoubleArray{
					Data: doubleData,
				},
			},
		})
	case schemapb.DataType_String, schemapb.DataType_VarChar:
		stringFieldData := &schemapb.ScalarField{
			Data: &schemapb.ScalarField_StringData{
				StringData: &schemapb.StringArray{
					Data: make([]string, 0),
				},
			},
		}
		for i := 0; i < len(arr); i++ {
			if str, ok := arr[i].(string); ok {
				stringFieldData.GetStringData().Data = append(stringFieldData.GetStringData().Data, str)
			} else {
				return merr.WrapErrImportFailed(fmt.Sprintf("illegal value '%v' for string array field '%s'", arr, schema.GetName()))
			}
		}
		field.(*storage.ArrayFieldData).Data = append(field.(*storage.ArrayFieldData).Data, stringFieldData)
	default:
		return merr.WrapErrImportFailed(fmt.Sprintf("unsupport element type: %v", getTypeName(schema.GetElementType())))
	}
	return nil
}

func printFieldsDataInfo(fieldsData BlockData, msg string, files []string) {
	stats := make([]zapcore.Field, 0)
	for k, v := range fieldsData {
		stats = append(stats, zap.Int(strconv.FormatInt(k, 10), v.RowNum()))
	}

	if len(files) > 0 {
		stats = append(stats, zap.Any(Files, files))
	}
	log.Info(msg, stats...)
}

// GetFileNameAndExt extracts file name and extension
// for example: "/a/b/c.ttt" returns "c" and ".ttt"
func GetFileNameAndExt(filePath string) (string, string) {
	fileName := path.Base(filePath)
	fileType := path.Ext(fileName)
	fileNameWithoutExt := strings.TrimSuffix(fileName, fileType)
	return fileNameWithoutExt, fileType
}

// getFieldDimension gets dimension of vecotor field
func getFieldDimension(schema *schemapb.FieldSchema) (int, error) {
	for _, kvPair := range schema.GetTypeParams() {
		key, value := kvPair.GetKey(), kvPair.GetValue()
		if key == common.DimKey {
			dim, err := strconv.Atoi(value)
			if err != nil {
				return 0, merr.WrapErrImportFailed(fmt.Sprintf("illegal vector dimension '%s' for field '%s', error: %v", value, schema.GetName(), err))
			}
			return dim, nil
		}
	}

	return 0, merr.WrapErrImportFailed(fmt.Sprintf("vector dimension is not defined for field '%s'", schema.GetName()))
}

// triggerGC triggers golang gc to return all free memory back to the underlying system at once,
// Note: this operation is expensive, and can lead to latency spikes as it holds the heap lock through the whole process
func triggerGC() {
	debug.FreeOSMemory()
}

// if user didn't provide dynamic data, fill the dynamic field by "{}"
func fillDynamicData(blockData BlockData, collectionSchema *schemapb.CollectionSchema) error {
	if !collectionSchema.GetEnableDynamicField() {
		return nil
	}

	dynamicFieldID := int64(-1)
	for i := 0; i < len(collectionSchema.Fields); i++ {
		schema := collectionSchema.Fields[i]
		if schema.GetIsDynamic() {
			dynamicFieldID = schema.GetFieldID()
			break
		}
	}

	if dynamicFieldID < 0 {
		return merr.WrapErrImportFailed("the collection schema is dynamic but dynamic field is not found")
	}

	rowCount := 0
	if len(blockData) > 0 {
		for id, v := range blockData {
			if id == dynamicFieldID {
				continue
			}
			rowCount = v.RowNum()
		}
	}

	dynamicData, ok := blockData[dynamicFieldID]
	if !ok || dynamicData == nil {
		// dynamic field data is not provided, create new one
		dynamicData = &storage.JSONFieldData{
			Data: make([][]byte, 0),
		}
	}

	if dynamicData.RowNum() < rowCount {
		// fill the dynamic data by an empty JSON object, make sure the row count is eaual to other fields
		data := dynamicData.(*storage.JSONFieldData)
		bs := []byte("{}")
		dynamicRowCount := dynamicData.RowNum()
		for i := 0; i < rowCount-dynamicRowCount; i++ {
			data.Data = append(data.Data, bs)
		}
	}

	blockData[dynamicFieldID] = dynamicData

	return nil
}

// tryFlushBlocks does the two things:
// 1. if accumulate data of a block exceed blockSize, call callFlushFunc to generate new binlog file
// 2. if total accumulate data exceed maxTotalSize, call callFlushFunc to flush the biggest block
func tryFlushBlocks(ctx context.Context,
	shardsData []ShardData,
	collectionSchema *schemapb.CollectionSchema,
	callFlushFunc ImportFlushFunc,
	blockSize int64,
	maxTotalSize int64,
	force bool,
) error {
	totalSize := 0
	biggestSize := 0
	biggestItem := -1
	biggestPartition := int64(-1)

	// 1. if accumulate data of a block exceed blockSize, call callFlushFunc to generate new binlog file
	for i := 0; i < len(shardsData); i++ {
		// outside context might be canceled(service stop, or future enhancement for canceling import task)
		if isCanceled(ctx) {
			log.Warn("Import util: import task was canceled")
			return merr.WrapErrImportFailed("import task was canceled")
		}

		shardData := shardsData[i]
		for partitionID, blockData := range shardData {
			err := fillDynamicData(blockData, collectionSchema)
			if err != nil {
				log.Warn("Import util: failed to fill dynamic field", zap.Error(err))
				return merr.WrapErrImportFailed(fmt.Sprintf("failed to fill dynamic field, error: %v", err))
			}

			// Note: even rowCount is 0, the size is still non-zero
			size := 0
			rowCount := 0
			for _, fieldData := range blockData {
				size += fieldData.GetMemorySize()
				rowCount = fieldData.RowNum()
			}

			// force to flush, called at the end of Read()
			if force && rowCount > 0 {
				printFieldsDataInfo(blockData, "import util: prepare to force flush a block", nil)
				err := callFlushFunc(blockData, i, partitionID)
				if err != nil {
					log.Warn("Import util: failed to force flush block data", zap.Int("shardID", i),
						zap.Int64("partitionID", partitionID), zap.Error(err))
					return merr.WrapErrImportFailed(fmt.Sprintf("failed to force flush block data for shard id %d to partition %d, error: %v", i, partitionID, err))
				}
				log.Info("Import util: force flush", zap.Int("rowCount", rowCount), zap.Int("size", size),
					zap.Int("shardID", i), zap.Int64("partitionID", partitionID))

				shardData[partitionID] = initBlockData(collectionSchema)
				if shardData[partitionID] == nil {
					log.Warn("Import util: failed to initialize FieldData list", zap.Int("shardID", i), zap.Int64("partitionID", partitionID))
					return merr.WrapErrImportFailed(fmt.Sprintf("failed to initialize FieldData list for shard id %d to partition %d", i, partitionID))
				}
				continue
			}

			// if segment size is larger than predefined blockSize, flush to create a new binlog file
			// initialize a new FieldData list for next round batch read
			if size > int(blockSize) && rowCount > 0 {
				printFieldsDataInfo(blockData, "import util: prepare to flush block larger than blockSize", nil)
				err := callFlushFunc(blockData, i, partitionID)
				if err != nil {
					log.Warn("Import util: failed to flush block data", zap.Int("shardID", i),
						zap.Int64("partitionID", partitionID), zap.Error(err))
					return merr.WrapErrImportFailed(fmt.Sprintf("failed to flush block data for shard id %d to partition %d, error: %v", i, partitionID, err))
				}
				log.Info("Import util: block size exceed limit and flush", zap.Int("rowCount", rowCount), zap.Int("size", size),
					zap.Int("shardID", i), zap.Int64("partitionID", partitionID), zap.Int64("blockSize", blockSize))

				shardData[partitionID] = initBlockData(collectionSchema)
				if shardData[partitionID] == nil {
					log.Warn("Import util: failed to initialize FieldData list", zap.Int("shardID", i), zap.Int64("partitionID", partitionID))
					return merr.WrapErrImportFailed(fmt.Sprintf("failed to initialize FieldData list for shard id %d to partition %d", i, partitionID))
				}
				continue
			}

			// calculate the total size(ignore the flushed blocks)
			// find out the biggest block for the step 2
			totalSize += size
			if size > biggestSize {
				biggestSize = size
				biggestItem = i
				biggestPartition = partitionID
			}
		}
	}

	// 2. if total accumulate data exceed maxTotalSize, call callFlushFUnc to flush the biggest block
	if totalSize > int(maxTotalSize) && biggestItem >= 0 && biggestPartition >= 0 {
		// outside context might be canceled(service stop, or future enhancement for canceling import task)
		if isCanceled(ctx) {
			log.Warn("Import util: import task was canceled")
			return merr.WrapErrImportFailed("import task was canceled")
		}

		blockData := shardsData[biggestItem][biggestPartition]
		err := fillDynamicData(blockData, collectionSchema)
		if err != nil {
			log.Warn("Import util: failed to fill dynamic field", zap.Error(err))
			return merr.WrapErrImportFailed(fmt.Sprintf("failed to fill dynamic field, error: %v", err))
		}

		// Note: even rowCount is 0, the size is still non-zero
		size := 0
		rowCount := 0
		for _, fieldData := range blockData {
			size += fieldData.GetMemorySize()
			rowCount = fieldData.RowNum()
		}

		if rowCount > 0 {
			printFieldsDataInfo(blockData, "import util: prepare to flush biggest block", nil)
			err = callFlushFunc(blockData, biggestItem, biggestPartition)
			if err != nil {
				log.Warn("Import util: failed to flush biggest block data", zap.Int("shardID", biggestItem),
					zap.Int64("partitionID", biggestPartition))
				return merr.WrapErrImportFailed(fmt.Sprintf("failed to flush biggest block data for shard id %d to partition %d, error: %v",
					biggestItem, biggestPartition, err))
			}
			log.Info("Import util: total size exceed limit and flush", zap.Int("rowCount", rowCount),
				zap.Int("size", size), zap.Int("totalSize", totalSize), zap.Int("shardID", biggestItem))

			shardsData[biggestItem][biggestPartition] = initBlockData(collectionSchema)
			if shardsData[biggestItem][biggestPartition] == nil {
				log.Warn("Import util: failed to initialize FieldData list", zap.Int("shardID", biggestItem),
					zap.Int64("partitionID", biggestPartition))
				return merr.WrapErrImportFailed(fmt.Sprintf("failed to initialize FieldData list for shard id %d to partition %d", biggestItem, biggestPartition))
			}
		}
	}

	return nil
}

func getTypeName(dt schemapb.DataType) string {
	switch dt {
	case schemapb.DataType_Bool:
		return "Bool"
	case schemapb.DataType_Int8:
		return "Int8"
	case schemapb.DataType_Int16:
		return "Int16"
	case schemapb.DataType_Int32:
		return "Int32"
	case schemapb.DataType_Int64:
		return "Int64"
	case schemapb.DataType_Float:
		return "Float"
	case schemapb.DataType_Double:
		return "Double"
	case schemapb.DataType_VarChar:
		return "Varchar"
	case schemapb.DataType_String:
		return "String"
	case schemapb.DataType_BinaryVector:
		return "BinaryVector"
	case schemapb.DataType_FloatVector:
		return "FloatVector"
	case schemapb.DataType_JSON:
		return "JSON"
	default:
		return "InvalidType"
	}
}

func pkToShard(pk interface{}, shardNum uint32) (uint32, error) {
	var shard uint32
	strPK, ok := pk.(string)
	if ok {
		hash := typeutil.HashString2Uint32(strPK)
		shard = hash % shardNum
	} else {
		intPK, ok := pk.(int64)
		if !ok {
			log.Warn("Numpy parser: primary key field must be int64 or varchar")
			return 0, merr.WrapErrImportFailed("primary key field must be int64 or varchar")
		}
		hash, _ := typeutil.Hash32Int64(intPK)
		shard = hash % shardNum
	}

	return shard, nil
}

func UpdateKVInfo(infos *[]*commonpb.KeyValuePair, k string, v string) error {
	if infos == nil {
		return merr.WrapErrImportFailed("Import util: kv array pointer is nil")
	}

	found := false
	for _, kv := range *infos {
		if kv.GetKey() == k {
			kv.Value = v
			found = true
		}
	}
	if !found {
		*infos = append(*infos, &commonpb.KeyValuePair{Key: k, Value: v})
	}

	return nil
}
